package com.atguigu.app.func;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.bean.TableProcess;
import com.atguigu.common.GmallConfig;
import com.atguigu.utils.DruidDSUtil;
import com.atguigu.utils.JdbcUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;

public class DimTableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {

    private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
    private DruidDataSource druidDataSource;
    private HashMap<String, TableProcess> tableProcessHashMap;

    public DimTableProcessFunction(MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
        this.mapStateDescriptor = mapStateDescriptor;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        druidDataSource = DruidDSUtil.getDruidDataSource();

        tableProcessHashMap = new HashMap<>();

        Connection connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/gmall-220718-config?user=root&password=000000&useUnicode=true&characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false");
        List<TableProcess> tableProcesses = JdbcUtil.queryList(connection,
                "select * from table_process where sink_type='dim'",
                TableProcess.class,
                true);

        for (TableProcess tableProcess : tableProcesses) {
            checkTable(tableProcess.getSinkTable(),
                    tableProcess.getSinkPk(),
                    tableProcess.getSinkColumns(),
                    tableProcess.getSinkExtend());

            tableProcessHashMap.put(tableProcess.getSourceTable(), tableProcess);
        }

        connection.close();
    }

    //Value：{"before":null,"after":{"source_table":"base_category3","sink_table":"dim_base_category3","sink_columns":"id,name,category2_id","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1669162876406,"snapshot":"false","db":"gmall-220623-config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1669162876406,"transaction":null}
    @Override
    public void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {

        JSONObject jsonObject = JSONObject.parseObject(value);
        BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);

        //0.如果当前为删除数据操作,则从状态中也要将数据进行删除
        if ("d".equals(jsonObject.getString("op"))) {
            String key = jsonObject.getJSONObject("before").getString("source_table");
            broadcastState.remove(key);
            tableProcessHashMap.remove(key);
        } else {
            JSONObject after = jsonObject.getJSONObject("after");
            String key = after.getString("source_table");
            //1.解析数据为TableProcess对象
            TableProcess tableProcess = JSONObject.parseObject(jsonObject.getString("after"), TableProcess.class);

            //2.检验并建表
            checkTable(tableProcess.getSinkTable(),
                    tableProcess.getSinkPk(),
                    tableProcess.getSinkColumns(),
                    tableProcess.getSinkExtend());

            //3.将数据写入状态
            broadcastState.put(key, tableProcess);
        }
    }

    //校验并建表:create table if not exists db.tn(id varchar primary key,name varchar,sex varchar) xxx;
    private void checkTable(String sinkTable, String sinkPk, String sinkColumns, String sinkExtend) {
        DruidPooledConnection connection = null;
        PreparedStatement preparedStatement = null;

        try {
            //处理特殊字段
            if (sinkPk == null || "".equals(sinkPk)) {
                sinkPk = "id";
            }

            if (sinkExtend == null) {
                sinkExtend = "";
            }

            //拼接建表SQL语句
            StringBuilder stringBuilder = new StringBuilder("create table if not exists ")
                    .append(GmallConfig.PHOENIX_DB)
                    .append(".")
                    .append(sinkTable)
                    .append("(");

            //切分字段"id,name,sex"
            String[] fields = sinkColumns.split(",");
            for (int i = 0; i < fields.length; i++) {

                String field = fields[i];

                stringBuilder.append(field).append(" varchar");

                //判断是否为主键字段
                if (sinkPk.equals(field)) {
                    stringBuilder.append(" primary key");
                }

                //判断是否为最后一个字段,如果不是最后一个字段,则需要添加","
                if (i < fields.length - 1) {
                    stringBuilder.append(",");
                }
            }

            stringBuilder.append(")").append(sinkExtend);

            //打印SQL语句
            System.out.println(stringBuilder);

            //获取连接
            connection = druidDataSource.getConnection();
            preparedStatement = connection.prepareStatement(stringBuilder.toString());
            preparedStatement.execute();

        } catch (SQLException e) {
            e.printStackTrace();
            throw new RuntimeException("建表:" + sinkTable + "失败！！");
        } finally {
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //Value:{"database":"gmall-220623-flink","table":"comment_info","type":"insert","ts":1669162958,"xid":1111,"xoffset":13941,"data":{"id":1595211185799847960,"user_id":119,"nick_name":null,"head_img":null,"sku_id":31,"spu_id":10,"order_id":987,"appraise":"1204","comment_txt":"评论内容：48384811984748167197482849234338563286217912223261","create_time":"2022-08-02 08:22:38","operate_time":null}}
    @Override
    public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {

        //1.读取状态
        ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        String key = value.getString("table");

        //获取配置信息
        TableProcess tableProcess = broadcastState.get(key);
        TableProcess tableProcessMap = tableProcessHashMap.get(key);

        JSONObject data = value.getJSONObject("data");
        System.out.println("data:" + data);

        String type = value.getString("type");

        //2.过滤数据(行，列)
        if ((tableProcess != null || tableProcessMap != null) && ("insert".equals(type) || "update".equals(type) || "bootstrap-insert".equals(type))) {

            if (tableProcess == null) {
                tableProcess = tableProcessMap;
            }

            filterColumns(data, tableProcess.getSinkColumns());

            //3.将过滤后的数据输出
            value.put("sink_table", tableProcess.getSinkTable());
            out.collect(value);

        } else {
            System.out.println("没有对应:" + key + "的配置信息！");
        }
    }

    //根据建表字段过滤主流数据字段
    //data:{"id":"12","tm_name":"aaaa","logo_url":"xxx"}
    //sinkColumns:id,tm_name
    private void filterColumns(JSONObject data, String sinkColumns) {

        String[] fields = sinkColumns.split(",");
        List<String> fieldList = Arrays.asList(fields);

//        Set<Map.Entry<String, Object>> entries = data.entrySet();
//        Iterator<Map.Entry<String, Object>> iterator = entries.iterator();
//        while (iterator.hasNext()) {
//            Map.Entry<String, Object> entry = iterator.next();
//            if (!fieldList.contains(entry.getKey())) {
//                iterator.remove();
//            }
//        }

        Set<Map.Entry<String, Object>> entries = data.entrySet();
        entries.removeIf(entry -> !fieldList.contains(entry.getKey()));

    }
}